/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.lvyh.lightframe.core.provider.transport;

import com.alibaba.fastjson.JSON;
import com.lvyh.lightframe.common.RpcContext;
import com.lvyh.lightframe.common.exception.RpcRuntimeException;
import com.lvyh.lightframe.common.ext.ExtensionLoaderFactory;
import com.lvyh.lightframe.common.util.ThreadPoolUtils;
import com.lvyh.lightframe.common.util.ThrowableUtil;
import com.lvyh.lightframe.core.cache.ServiceRegistryCache;
import com.lvyh.lightframe.core.filter.FilterChain;
import com.lvyh.lightframe.core.filter.ProviderFilterChain;
import com.lvyh.lightframe.core.invoke.request.Heartbeat;
import com.lvyh.lightframe.core.invoke.request.RpcRequest;
import com.lvyh.lightframe.core.invoke.response.RpcResponse;
import com.lvyh.lightframe.core.serialize.Serializer;
import com.lvyh.lightframe.core.util.SpringContextUtils;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ThreadPoolExecutor;


public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private static final Logger logger = LoggerFactory.getLogger(NettyHttpServerHandler.class);

    private Serializer serializer = null;
    private ThreadPoolExecutor threadPoolExecutor = ThreadPoolUtils.makeServerThreadPool(NettyServerBootstrap.class.getSimpleName(), 60, 300);

    public NettyHttpServerHandler() {
        serializer = ExtensionLoaderFactory.getExtensionLoader(Serializer.class).getExtension(RpcContext.getSerializer());
    }

    @Override
    protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

        final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());
        final String uri = msg.uri();
        final boolean keepAlive = HttpUtil.isKeepAlive(msg);

        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                process(ctx, uri, requestBytes, keepAlive);
            }
        });
    }

    private void process(ChannelHandlerContext ctx, String uri, byte[] requestBytes, boolean keepAlive) {
        String requestId = null;
        try {
            if ("/services".equals(uri)) {

                StringBuffer stringBuffer = new StringBuffer("<ui>");
                for (String serviceKey : ServiceRegistryCache.serviceCache.keySet()) {
                    stringBuffer.append("<li>").append(serviceKey).append(": ").append(ServiceRegistryCache.serviceCache.get(serviceKey)).append("</li>");
                }
                stringBuffer.append("</ui>");
                byte[] responseBytes = stringBuffer.toString().getBytes("UTF-8");
                writeResponse(ctx, keepAlive, responseBytes);

            } else {

                if (requestBytes.length == 0) {
                    throw new RpcRuntimeException("request data is empty.");
                }

                RpcRequest rpcRequest = (RpcRequest) serializer.deserialize(requestBytes, RpcRequest.class);

                logger.info("[NettyHttpServerHandler] server receive request, param:{}, uri:{}", JSON.toJSONString(rpcRequest), uri);
                requestId = rpcRequest.getRequestId();

                if (Heartbeat.PING_REQ_ID.equalsIgnoreCase(rpcRequest.getRequestId())) {
                    logger.info("[NettyHttpServerHandler] server receive heart beat req.");
                    return;
                }

                FilterChain filterChain = SpringContextUtils.getBean(ProviderFilterChain.class);
                RpcResponse response = filterChain.filter(rpcRequest);
                byte[] responseBytes = serializer.serialize(response);
                writeResponse(ctx, keepAlive, responseBytes);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);

            RpcResponse response = new RpcResponse();
            response.setRequestId(requestId);
            response.setErrorMsg(ThrowableUtil.toString(e));
            byte[] responseBytes = serializer.serialize(response);
            writeResponse(ctx, keepAlive, responseBytes);
        }

    }

    private void writeResponse(ChannelHandlerContext ctx, boolean keepAlive, byte[] responseBytes) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(responseBytes));
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=UTF-8");
        response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
        if (keepAlive) {
            response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
        ctx.writeAndFlush(response);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        logger.error("[NettyHttpServerHandler] server error", cause);
        ctx.close();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            ctx.channel().close();
            logger.info("[NettyHttpServerHandler] server close an idle channel.");
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }

}